home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / parallel / communic.c < prev    next >
C/C++ Source or Header  |  1992-04-11  |  15KB  |  476 lines

  1. /*-------------------------------------------------------------------*/
  2. /*----Communications controller between tuple server and client      */
  3. /*----processes. Execed by the server, and uses rexec to start       */
  4. /*----clients.                                                       */
  5. /*----                                                               */
  6. /*----Written by James Pinakis 7/12/89                               */
  7. /*----Upgraded by Geoff Sutcliffe 11/4/90                            */
  8. /*----Completely re-written by Geoff Sutcliffe 24/8/90               */
  9. /*-------------------------------------------------------------------*/
  10. #include <stdio.h>
  11. #include <string.h>
  12. #include <fcntl.h>
  13. #include <ctype.h>
  14. #include <sys/types.h>
  15. #include <sys/time.h>
  16. #include <sys/timeb.h>
  17. #include <netdb.h>
  18.  
  19. #define BUFFER_SIZE 4096
  20. #define MAX_CLIENT_MACHINES 30
  21. #define MAX_STRING_LENGTH 15
  22. #define ERROR -1
  23. #define SOCKET_EOF -2
  24. #define OK 1
  25. #define COMPLETE 0
  26. #define NULL_DESCRIPTOR -1
  27.  
  28. /*----Enuerated type for each of the types of messages that may      */
  29. /*----appear                                                         */
  30. typedef enum 
  31.     {
  32.     machines,
  33.     start,
  34.     stop,
  35.     forward,
  36.     exit_communicator,
  37.     eof,
  38.     blank_line,
  39.     error
  40.     } request_type;
  41.  
  42. typedef enum
  43.     {
  44.     read_ok,
  45.     read_eof,
  46.     read_blank_line,
  47.     read_error
  48.     } read_result;
  49.  
  50. typedef char string_type[MAX_STRING_LENGTH];
  51.  
  52. typedef char buffer_type[BUFFER_SIZE];
  53.  
  54. /*----Structure for holding logical and actual machine name, and the */
  55. /*----descriptor for writing and read to/from them                   */
  56. typedef struct
  57.     {
  58.     string_type client_machine_name;
  59.     string_type machine_name;
  60.     int descriptor;
  61.     } machine_table_entry_type;
  62.  
  63. typedef machine_table_entry_type machine_table_type[
  64. MAX_CLIENT_MACHINES];
  65. /*-------------------------------------------------------------------*/
  66. /*----Get the arguments to the communicator. There should be two     */
  67. /*----integers - the descriptor for writing to and that for reading  */
  68. /*----from.                                                          */
  69. void get_argument_values(number_of_arguments,arguments,
  70. write_descriptor,read_descriptor)
  71. int number_of_arguments,*write_descriptor,*read_descriptor;
  72. char *arguments[];
  73. {
  74. int i1;
  75.  
  76. if (number_of_arguments != 5) 
  77.     {
  78.     (void) fprintf(stderr,"%s: incorrect no. of arguments\n",
  79. arguments[0]);
  80.     for (i1=0;i1<number_of_arguments;i1++)
  81.         (void) fprintf(stderr,"Argument %d is %s\n",i1,arguments[i1]);
  82.     exit(1);
  83.     }
  84. *write_descriptor = atoi(arguments[1]);
  85. *read_descriptor = atoi(arguments[2]);
  86. #ifdef DEBUG
  87. (void) fprintf(stderr,"Write_descriptor is %d, Read_descriptor is %d\n",
  88. *write_descriptor,*read_descriptor);
  89. #endif
  90. }
  91. /*-------------------------------------------------------------------*/
  92. /*----Initialise the use of sockets, using tcp protocol              */
  93. void initialise_sockets(server_pointer,port_number,read_template)
  94. struct servent **server_pointer;
  95. int *port_number;
  96. fd_set *read_template;
  97. {
  98. *server_pointer = getservbyname("exec","tcp");
  99. if (*server_pointer == NULL) 
  100.     {
  101.     perror("exec - unknown service");
  102.     exit(1);
  103.     }
  104. *port_number = (*server_pointer)->s_port;
  105. FD_ZERO(read_template);
  106. }
  107. /*-------------------------------------------------------------------*/
  108. /*----Put a logical and absolute machine name into the machine table.*/
  109. /*----A null descriptor is assigned to the machine.                  */
  110. void load_machine_table(machine_table,entry_number,client_machine_name)
  111. machine_table_type machine_table;
  112. int entry_number;
  113. char *client_machine_name;
  114. {
  115. char *cp1;
  116.  
  117. #ifdef DEBUG
  118. (void) fprintf(stderr,"Loading machine %s at position %d\n",
  119. client_machine_name,entry_number);
  120. #endif
  121. (void) strcpy(machine_table[entry_number].client_machine_name,
  122. client_machine_name);
  123. cp1 = strchr(client_machine_name,'(');
  124. *cp1 = '\0';
  125. #ifdef DEBUG
  126. (void) fprintf(stderr,"Absolute machine name is %s\n",
  127. client_machine_name);
  128. #endif
  129. (void) strcpy(machine_table[entry_number].machine_name,
  130. client_machine_name);
  131. machine_table[entry_number].descriptor = NULL_DESCRIPTOR;
  132. }
  133. /*-------------------------------------------------------------------*/
  134. /*----Find the index of a given logical machine name in the machine  */
  135. /*----table.                                                         */
  136. int entry_lookup(machine_table,client_machine_name)
  137. machine_table_type machine_table;
  138. char *client_machine_name;
  139. {
  140. int entry_number;
  141.  
  142. for (entry_number = 0;entry_number<MAX_CLIENT_MACHINES;entry_number++)
  143.     if (!strcmp(client_machine_name,machine_table[entry_number].
  144. client_machine_name))
  145.         return(entry_number);
  146. (void) fprintf(stderr,"Machine table does not hold %s\n",
  147. client_machine_name);
  148. return(ERROR);
  149. }
  150. /*-------------------------------------------------------------------*/
  151. /*----Remove unwanted spaces from the buffer                         */
  152. void tidy_buffer(buffer)
  153. char *buffer;
  154. {
  155. char *new_buffer;
  156.  
  157. new_buffer = buffer;
  158. while (*buffer != '\0')
  159.     {
  160.     if (*buffer != ' ')
  161.         *new_buffer++ = *buffer;
  162.     buffer++;
  163.     }
  164. *new_buffer = '\0';
  165. }
  166. /*-------------------------------------------------------------------*/
  167. /*----Read an \n terminated string from a descriptor, into a buffer. */
  168. read_result read_string(buffer,maximum_size,descriptor)
  169. char *buffer;
  170. int maximum_size,descriptor;
  171. {
  172. int this_read,number_read;
  173.  
  174. #ifdef DEBUG2
  175. fprintf(stderr,"Input on %d\n",descriptor);
  176. #endif
  177. number_read = 0;
  178. while (((this_read = read(descriptor,buffer,1)) > 0) && 
  179. (number_read < maximum_size) && (*buffer != '\n'))
  180.     {
  181. #ifdef DEBUG2
  182.     (void) fprintf(stderr,".%c",*buffer);
  183. #endif
  184.     number_read++;
  185.     buffer++;
  186.     }
  187. if (this_read == 0)
  188.     return(read_eof);
  189. if (this_read < 0)
  190.     return(read_error);
  191. if (number_read > maximum_size)
  192.     {
  193. #ifdef DEBUG
  194.     (void) fprintf(stderr,"Input too long in read_string\n");
  195. #endif
  196.     return(read_error);
  197.     }
  198. if (number_read == 0)
  199.     return(read_blank_line);
  200. /*----Replace \n by \0                                               */
  201. #ifdef DEBUG
  202. (void) fprintf(stderr,"Length %d\n",number_read);
  203. #endif
  204. *buffer = '\0';
  205. return(read_ok);
  206. }
  207. /*-------------------------------------------------------------------*/
  208. /*----Read input from a descriptor, work out what kind of message it */
  209. /*----is, and fix the arguments so that they are , separated so that */
  210. /*----they can be accessed with strtok. The buffer_pointer is set to */
  211. /*----point to the first argument.                                   */
  212. request_type parse(descriptor,buffer_pointer)
  213. int descriptor;
  214. char **buffer_pointer;
  215. {
  216. request_type request;
  217. static buffer_type buffer;
  218. char *cp1;
  219.  
  220. switch (read_string(buffer,BUFFER_SIZE,descriptor))
  221.     {
  222.     case read_error:
  223.         return(error);
  224.     break;
  225.     case read_eof:
  226.     return(eof);
  227.     break;
  228.     case read_blank_line:
  229.         return(blank_line);
  230.         break;
  231.     case read_ok:
  232.         tidy_buffer(buffer);
  233. #ifdef DEBUG
  234.         (void) fprintf(stderr,"Request =%s=\n",buffer);
  235. #endif
  236.         *buffer_pointer = buffer;
  237. /*----Now to get the arguments out by themselves. If there are any */
  238. /*----arguments, then make buffer pointer to them as a string      */ 
  239.         if ((cp1 = strrchr(buffer,')')) != NULL)
  240.             {
  241.             *cp1 = '\0';
  242.             if ((cp1 = strchr(buffer,'(')) != NULL)
  243.                 {
  244.                 *buffer_pointer = cp1 + 1;
  245.                 *cp1 = '\0';
  246.                 }
  247.             else return(error);
  248.             }
  249. /*----Otherwise put a \0 on the . and buffer pointer is NULL       */
  250.         else if ((cp1 = strchr(buffer,'.')) != NULL)
  251.         {
  252.         *cp1 = '\0';
  253.         *buffer_pointer = NULL;
  254.         }
  255.             else return(error);
  256. /*----Decide on the message type */
  257.         if (strcmp(buffer,"machines") == 0)
  258.             return(machines);
  259.         else if (strcmp(buffer,"start") == 0)
  260.                 return(start);
  261.             else if (strcmp(buffer,"stop") == 0)
  262.                     return(stop);
  263.                 else if (strcmp(buffer,"forward") == 0)
  264.                         return(forward);
  265.                     else if (strcmp(buffer,"exit") == 0)
  266.                             return(exit_communicator);
  267.                         else return(error);
  268.     break;
  269.     default:
  270.     return(error);
  271.     break;
  272.     }
  273. }
  274. /*-------------------------------------------------------------------*/
  275. /*----Start up a client machine using rexec, and update the machine  */
  276. /*----table with the descriptor returned.                            */
  277. void start_machine(machine_table,buffer,port_number,read_template)
  278. machine_table_type machine_table;
  279. char *buffer;
  280. int port_number;
  281. fd_set *read_template;
  282. {
  283. char *client_machine_name,*machine_name;
  284. int descriptor,entry_number;
  285.  
  286. client_machine_name = strtok(buffer,",");
  287. if ((entry_number = entry_lookup(machine_table,client_machine_name)) !=
  288. ERROR)
  289.     {
  290.     machine_name = machine_table[entry_number].machine_name;
  291. #ifdef DEBUG
  292.     (void) fprintf(stderr,"About to rexec %s\n",machine_name);
  293. #endif
  294.     if ((descriptor = rexec(&machine_name,(u_short) port_number,
  295. "geoff","uktt,g","bin/prolog linda/linda",(int *) 0)) == ERROR) 
  296.         {
  297.         perror("Cannot rexec");
  298.         return;
  299.         }
  300.     else {
  301.         machine_table[entry_number].descriptor = descriptor;
  302.         FD_SET(descriptor,read_template);
  303. #ifdef DEBUG
  304.         (void) fprintf(stderr,"%s uses descriptor %d\n",machine_name,
  305. descriptor);
  306. #endif
  307.         }
  308.     }
  309. }
  310. /*-------------------------------------------------------------------*/
  311. /*----Close the socket for a client machine that has terminated, and */
  312. /*----set the associated descriptor in the machine table to null.    */
  313. void stop_machine(machine_table,buffer,read_template)
  314. machine_table_type machine_table;
  315. char *buffer;
  316. fd_set *read_template;
  317. {
  318. int entry_number;
  319. char *client_machine_name;
  320.  
  321. client_machine_name = strtok(buffer,",");
  322. if ((entry_number = entry_lookup(machine_table,client_machine_name)) !=
  323. ERROR)
  324.     {
  325.     FD_CLR(machine_table[entry_number].descriptor,read_template);
  326.     (void)close(machine_table[entry_number].descriptor);
  327.     machine_table[entry_number].descriptor = NULL_DESCRIPTOR;
  328.     }
  329. }
  330. /*-------------------------------------------------------------------*/
  331. void forward_message(machine_table,buffer)
  332. machine_table_type machine_table;
  333. char *buffer;
  334. {
  335. char *client_machine_name,*term;
  336. int entry_number;
  337.  
  338. client_machine_name = buffer;
  339. term = strchr(buffer,',');
  340. *term++ = '\0';
  341. if ((entry_number = entry_lookup(machine_table,client_machine_name)) !=
  342. ERROR)
  343.     {
  344.     (void) strcat(term,".\n");
  345. #ifdef DEBUG
  346.     (void) fprintf(stderr,"Sending =%s= to %s on %d, entry %d\n",term,
  347. client_machine_name,machine_table[entry_number].descriptor,entry_number);
  348. #endif
  349.     if (write(machine_table[entry_number].descriptor,term,
  350. strlen(term)) != strlen(term))
  351.         perror("Forwarding a term");
  352.     }
  353. }
  354. /*-------------------------------------------------------------------*/
  355. int serve_request(machine_table,port_number,read_template)
  356. machine_table_type machine_table;
  357. int port_number;
  358. fd_set *read_template;
  359. {
  360. fd_set detect_template;
  361. int number_of_readable_sockets,entry_number,status;
  362. static int number_of_entries = 2;  /*----0 and 1 are the server */
  363. char *buffer,*argument;
  364.  
  365. detect_template = *read_template;
  366. number_of_readable_sockets = select(FD_SETSIZE,&detect_template,
  367. (fd_set *)0,(fd_set *)0,(struct timeval *)0);
  368. if (number_of_readable_sockets < 0) 
  369.     {
  370.     perror("select");
  371.     exit(1);
  372.     }
  373. status = OK;
  374. /*----Search for input from entry number 1. 0 is write to server.   */
  375. for (entry_number = 0;status == OK && entry_number < number_of_entries;
  376. entry_number++)
  377.     if (machine_table[entry_number].descriptor != NULL_DESCRIPTOR &&
  378. FD_ISSET(machine_table[entry_number].descriptor,&detect_template)) 
  379.         {
  380. #ifdef TIME
  381. {
  382. struct timeb time_data;
  383.  
  384. ftime(&time_data);
  385. printf("=%d %d=\n",time_data.time, time_data.millitm);
  386. }
  387. #endif
  388. #ifdef DEBUG
  389.         (void) fprintf(stderr,"Input on %d\n",machine_table[entry_number].
  390. descriptor);
  391. #endif
  392.         switch (parse(machine_table[entry_number].descriptor,&buffer)) 
  393.             {
  394.             case machines:
  395.                 argument = strtok(buffer,",");
  396.                 while (argument != NULL)
  397.                     {
  398.                     load_machine_table(machine_table,number_of_entries++,
  399. argument);
  400.                     argument = strtok((char *)NULL,",");
  401.                     }
  402.                 break;
  403.             case start:
  404.                 start_machine(machine_table,buffer,port_number,
  405. read_template);
  406.                 break;
  407.             case stop:
  408.                 stop_machine(machine_table,buffer,read_template);
  409.                 break;
  410.             case forward:
  411.                 forward_message(machine_table,buffer);
  412.                 break;
  413.             case exit_communicator:
  414.                 status = COMPLETE;
  415.                 break;
  416.             case eof:
  417.                 break;
  418.             case blank_line:
  419.                 break;
  420.             case error:
  421.                 (void) fprintf(stderr,"\nBad input : %s\n",buffer);
  422.                 status = ERROR;
  423.                 break;
  424.             default:
  425.                 (void) fprintf(stderr,"Error in serve_request - \
  426. no descriptor found\n");
  427.                 status = ERROR;
  428.                 break;
  429.             }
  430.         }
  431. return(status);
  432. }
  433. /*-------------------------------------------------------------------*/
  434. int main(number_of_arguments,arguments)
  435. int number_of_arguments;
  436. char *arguments[];
  437. {
  438. int read_descriptor,write_descriptor,port_number;
  439. struct servent *server_pointer;
  440. fd_set read_template;
  441. machine_table_type machine_table;
  442.  
  443. #ifdef DEBUG
  444. int descriptor;
  445. if ((descriptor = open("comm_debug",O_CREAT|O_TRUNC|O_WRONLY,0644)) < 0)
  446.     perror("Cannot debug");
  447. else dup2(descriptor,2);
  448. (void) fprintf(stderr,"Communicator is going\n");
  449. #endif
  450.  
  451. get_argument_values(number_of_arguments,arguments,&write_descriptor,
  452. &read_descriptor);
  453. initialise_sockets(&server_pointer,&port_number,&read_template);
  454. /*----Load write to server details in position 0 of the table      */
  455. load_machine_table(machine_table,0,"server(1)");
  456. machine_table[0].descriptor = write_descriptor;
  457. /*----Load read from server details in position 1                  */
  458. load_machine_table(machine_table,1,"server(0)");
  459. machine_table[1].descriptor = read_descriptor;
  460. FD_SET(read_descriptor,&read_template);
  461. while (serve_request(machine_table,port_number,&read_template) == OK)
  462.     ;
  463. #ifdef DEBUG
  464. (void) fprintf(stderr,"Request service completed\n");
  465. #endif
  466. FD_CLR(read_descriptor,&read_template);
  467. #ifdef DEBUG
  468. (void) fprintf(stderr,"Read descriptor cleared\n");
  469. #endif
  470. machine_table[0].descriptor = NULL_DESCRIPTOR;
  471. machine_table[1].descriptor = NULL_DESCRIPTOR;
  472. return(0);
  473. }
  474. /*-------------------------------------------------------------------*/
  475.  
  476.